package com.xiaofan.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

/**
 * 流处理 WordCount
 *
 * 并行度的优先级问题：
 * 1. 算子上的并行度
 * 2. 代码全局并行度
 * 3. job提交时指定的并行度
 * 4. 集群默认配置的并行度（开发环境，idea中和电脑的cpu核数相关）
 */
object StreamWordCount {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
    val host: String = parameterTool.get("host")
    val port: Int = parameterTool.getInt("port")

    // 针对当前的执行环境设置并行度
    // env.setParallelism(16)
    // 全局禁用 Chain 链条
    // env.disableOperatorChaining()

    val data: DataStream[String] = env.socketTextStream(host, port).slotSharingGroup("a")

    val result: DataStream[(String, Int)] = data
      .flatMap(_.split(" ")).slotSharingGroup("b")
      .filter(_.nonEmpty).slotSharingGroup("a") // 断开前后的chain链条  .disableChaining()
      .map((_, 1)) // 重新开启一个chain链条 .startNewChain()
      .keyBy(_._1)
      .sum(1)

    result.print().setParallelism(1)

    env.execute("stream word count")


  }
}

/*
  env.disableOperatorChaining() // 全局禁用链条
  .filter(_.nonEmpty).disableChaining() // 断开前后的chain链条
  .map((_, 1)).startNewChain()        // 重新开启一个chain链条,断开前面的
  断开链条的最终目的：不想共用slot，但是有可能再次共用了
  .flatMap(_.split(" ")).slotSharingGroup("b") // slot共享组
 */
